{
 "cells": [
  {
   "cell_type": "markdown",
   "source": [
    "实验环境初始化"
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "source": [
    "import os\r\n",
    "import time\r\n",
    "from concurrent.futures import ThreadPoolExecutor, as_completed\r\n",
    "from boto3.session import Session\r\n",
    "import botocore\r\n",
    "from tqdm import tqdm\r\n",
    "import throttle\r\n",
    "\r\n",
    "# 准备密钥\r\n",
    "aws_access_key_id = 'hust'\r\n",
    "aws_secret_access_key = 'hust_obs'\r\n",
    "\r\n",
    "# 本地S3服务地址\r\n",
    "local_s3 = 'http://192.168.33.1:9000'\r\n",
    "\r\n",
    "# 建立会话\r\n",
    "session = Session(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)\r\n",
    "\r\n",
    "# 连接到服务\r\n",
    "s3 = session.resource('s3', endpoint_url=local_s3)"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "查看所有bucket"
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "source": [
    "for bucket in s3.buckets.all():\r\n",
    "    print('bucket name:%s' % bucket.name)"
   ],
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "bucket name:loadgen\n",
      "bucket name:test001\n"
     ]
    }
   ],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "新建一个实验用 bucket (注意：\"bucket name\" 中不能有下划线)"
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "source": [
    "bucket_name = 'test100objs'\r\n",
    "if s3.Bucket(bucket_name) not in s3.buckets.all():\r\n",
    "    s3.create_bucket(Bucket=bucket_name)"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "查看此 bucket 下的所有 object (若之前实验没有正常结束，则不为空)"
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "source": [
    "bucket = s3.Bucket(bucket_name)\r\n",
    "for obj in bucket.objects.all():\r\n",
    "    print('obj name:%s' % obj.key)"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "准备负载，可以按照几种不同请求到达率 (Inter-Arrival Time, IAT) 设置。"
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "source": [
    "# 初始化本地数据文件\r\n",
    "local_file = \"_test_4K.bin\"\r\n",
    "test_bytes = [0xFF for i in range(1024*4)] # 填充至所需大小\r\n",
    "\r\n",
    "with open(local_file, \"wb\") as lf:\r\n",
    "    lf.write(bytearray(test_bytes))\r\n",
    "\r\n",
    "# 发起请求和计算系统停留时间\r\n",
    "def request_timing(s3res, i): # 使用独立 session.resource 以保证线程安全\r\n",
    "    obj_name = \"testObj%08d\"%(i,) # 所建对象名\r\n",
    "    # temp_file = '.tempfile'\r\n",
    "    service_time = 0 # 系统滞留时间\r\n",
    "    start = time.time()\r\n",
    "    s3res.Object(bucket_name, obj_name).upload_file(local_file) # 将本地文件上传为对象\r\n",
    "    # 或\r\n",
    "    # bucket.put_object(Key=obj_name, Body=open(local_file, 'rb'))\r\n",
    "    # 下载obj\r\n",
    "    # s3res.Object(bucket_name, obj_name).download_file(temp_file)\r\n",
    "    end = time.time()\r\n",
    "    system_time = end - start\r\n",
    "    return system_time * 1000 # 换算为毫秒\r\n",
    "\r\n",
    "# 按照请求到达率限制来执行和跟踪请求\r\n",
    "def arrival_rate_max(s3res, i): # 不进行限速\r\n",
    "    return request_timing(s3res, i)\r\n",
    "\r\n",
    "@throttle.wrap(0.1, 2) # 100ms 内不超过 2 个请求，下同……\r\n",
    "def arrival_rate_2(s3res, i):\r\n",
    "    return request_timing(s3res, i)\r\n",
    "\r\n",
    "@throttle.wrap(0.1, 4)\r\n",
    "def arrival_rate_4(s3res, i):\r\n",
    "    return request_timing(s3res, i)\r\n",
    "\r\n",
    "@throttle.wrap(0.1, 8)\r\n",
    "def arrival_rate_8(s3res, i):\r\n",
    "    return request_timing(s3res, i)"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "按照预设IAT发起请求"
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "source": [
    "latency = []\r\n",
    "failed_requests = []\r\n",
    "\r\n",
    "with tqdm(desc=\"Accessing S3\", total=100) as pbar:      # 进度条设置，合计执行 100 项上传任务 (见 submit 部分)，进度也设置为 100 步\r\n",
    "    with ThreadPoolExecutor(max_workers=1) as executor: # 通过 max_workers 设置并发线程数\r\n",
    "        futures = [\r\n",
    "            executor.submit(\r\n",
    "                arrival_rate_max,\r\n",
    "                session.resource('s3', endpoint_url=local_s3), i) for i in range(100) # 为保证线程安全，应给每个任务申请一个新 resource\r\n",
    "            ]\r\n",
    "        for future in as_completed(futures):\r\n",
    "            if future.exception():\r\n",
    "                failed_requests.append(futures[future])\r\n",
    "            else:\r\n",
    "                latency.append(future.result()) # 正确完成的请求，采集延迟\r\n",
    "            pbar.update(1)"
   ],
   "outputs": [
    {
     "output_type": "stream",
     "name": "stderr",
     "text": [
      "Accessing S3: 100%|██████████| 100/100 [00:03<00:00, 29.25it/s]\n"
     ]
    }
   ],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "清理实验环境"
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "source": [
    "try:\r\n",
    "    # 删除bucket下所有object\r\n",
    "    bucket.objects.filter().delete()\r\n",
    "\r\n",
    "    # 删除bucket下某个object\r\n",
    "    # bucket.objects.filter(Prefix=obj_name).delete()\r\n",
    "\r\n",
    "    bucket.delete()\r\n",
    "except botocore.exceptions.ClientError as e:\r\n",
    "    print('error in bucket removal')"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "删除本地测试文件"
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "source": [
    "os.remove(local_file)"
   ],
   "outputs": [],
   "metadata": {}
  },
  {
   "cell_type": "markdown",
   "source": [
    "记录延迟到CSV文件"
   ],
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "source": [
    "with open(\"latency.csv\", \"w+\") as tracefile:\r\n",
    "    tracefile.write(\"latency\\n\")\r\n",
    "    tracefile.writelines([str(l) + '\\n' for l in latency])"
   ],
   "outputs": [],
   "metadata": {}
  }
 ],
 "metadata": {
  "orig_nbformat": 4,
  "language_info": {
   "name": "python",
   "version": "3.9.7",
   "mimetype": "text/x-python",
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "pygments_lexer": "ipython3",
   "nbconvert_exporter": "python",
   "file_extension": ".py"
  },
  "kernelspec": {
   "name": "python3",
   "display_name": "Python 3.9.7 64-bit ('lab': conda)"
  },
  "interpreter": {
   "hash": "f487011d6c19380a0f12fb28ca8659fdd2afd3ccddcfad5755f4bb9819179fc2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}